RxJava 2.X 简单用法 for Android

Posted by pangrongxian on 2018-01-19
Android

RxAndroid

Gradle 添加依赖 :
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.8'

1. 创建被观察者

Observable 正常用法

private Observable<String> getObservable() {
    //return Observable.just("one", "tow");

    // Observable正常用法
    Observable mObservable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("one");
            e.onNext("tow");
            e.onComplete();
        }
    });

    return mObservable;

}

Observable 简约用法

private Observable<String> getObservable() {
    return Observable.just("Cricket", "Football");
}

2. 创建观察者

private Observer<String> getObserver() {

    return new Observer<String>() {

        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, " onSubscribe : " + d.isDisposed());
        }

        @Override
        public void onNext(String value) {
            textView.append(" onNext : value : " + value);
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onNext : value : " + value);
        }

        @Override
        public void onError(Throwable e) {
            textView.append(" onError : " + e.getMessage());
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onError : " + e.getMessage());
        }

        @Override
        public void onComplete() {
            textView.append(" onComplete");
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onComplete");
        }
    };
}

3. 订阅

getObservable()
        //指定被观察者执行的线程环境,指定为io线程
        .subscribeOn(Schedulers.io())

        //将后面执行的线程环境切换为主线程,
        //但是这一句依然执行在 io 线程
        .observeOn(AndroidSchedulers.mainThread())

        //订阅
        .subscribe(getObserver());

//当调用订阅操作(即调用Observable.subscribe()方法)的时候,
//被观察者才真正开始发出事件。

观察者模式 - 运作流程图

流程图

流程图相应代码实

Observable.just("getFilePath()")

            //被观察者执行的线程环境
            .subscribeOn(Schedulers.newThread())

            //将接下来执行的线程环境指定为io线程
            .observeOn(Schedulers.io())

            //使用map操作来完成类型转换
            .map(new Function<String, Bitmap>() {
                @Override
                public Bitmap apply(String filePath) throws Exception {

                    //显然自定义的createBitmapFromPath(filePath)方法,是一个极其耗时的操作
                    return createBitmapFromPath(filePath);
                }
            })

            //将后面执行的线程环境切换为主线程
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    //创建观察者,作为事件传递的终点处理事件
                    new Observer<Bitmap>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            //当订阅后,会首先调用这个方法,其实就相当于onStart(),
                            //传入的Subscription s参数可以用于请求数据或者取消订阅
                        }

                        @Override
                        public void onNext(Bitmap bitmap) {
                            //处理事件
                            //showBitmap(bitmap)
                        }

                        @Override
                        public void onError(Throwable e) {
                            //出现错误会调用这个方法
                        }

                        @Override
                        public void onComplete() {
                            Log.d("DDDDDD","结束观察...\n");
                        }
                    }
            );




public Bitmap createBitmapFromPath(String pathUrl) {

    Bitmap bitmap = null;
    try {
        bitmap = Glide.with(this)
                .load(pathUrl)
                .asBitmap() //必须
                .centerCrop()
                .into(500, 500)
                .get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    return bitmap;

}

Comments: